/*******************************************************************************
 * Copyright (c) 2022, 2025 Eurotech and/or its affiliates and others
 *
 * This program and the accompanying materials are made
 * available under the terms of the Eclipse Public License 2.0
 * which is available at https://www.eclipse.org/legal/epl-2.0/
 *
 * SPDX-License-Identifier: EPL-2.0
 *
 * Contributors:
 *  Eurotech
 *******************************************************************************/
package org.eclipse.kura.container.orchestration.provider.impl;

import static java.util.Objects.isNull;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;

import org.eclipse.kura.KuraErrorCode;
import org.eclipse.kura.KuraException;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.eclipse.kura.container.orchestration.ContainerConfiguration;
import org.eclipse.kura.container.orchestration.ContainerInstanceDescriptor;
import org.eclipse.kura.container.orchestration.ContainerOrchestrationService;
import org.eclipse.kura.container.orchestration.ContainerState;
import org.eclipse.kura.container.orchestration.ImageConfiguration;
import org.eclipse.kura.container.orchestration.ImageInstanceDescriptor;
import org.eclipse.kura.container.orchestration.ImageInstanceDescriptor.ImageInstanceDescriptorBuilder;
import org.eclipse.kura.container.orchestration.PasswordRegistryCredentials;
import org.eclipse.kura.container.orchestration.PortInternetProtocol;
import org.eclipse.kura.container.orchestration.RegistryCredentials;
import org.eclipse.kura.container.orchestration.listener.ContainerOrchestrationServiceListener;
import org.eclipse.kura.container.orchestration.provider.impl.enforcement.AllowlistEnforcementMonitor;
import org.eclipse.kura.crypto.CryptoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.CreateContainerCmd;
import com.github.dockerjava.api.command.InspectImageResponse;
import com.github.dockerjava.api.command.PullImageCmd;
import com.github.dockerjava.api.command.PullImageResultCallback;
import com.github.dockerjava.api.model.AuthConfig;
import com.github.dockerjava.api.model.Bind;
import com.github.dockerjava.api.model.Container;
import com.github.dockerjava.api.model.ContainerPort;
import com.github.dockerjava.api.model.Device;
import com.github.dockerjava.api.model.DeviceRequest;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.Image;
import com.github.dockerjava.api.model.InternetProtocol;
import com.github.dockerjava.api.model.LogConfig;
import com.github.dockerjava.api.model.LogConfig.LoggingType;
import com.github.dockerjava.api.model.Ports;
import com.github.dockerjava.api.model.Ports.Binding;
import com.github.dockerjava.api.model.PullResponseItem;
import com.github.dockerjava.api.model.RestartPolicy;
import com.github.dockerjava.api.model.Volume;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.DockerClientImpl;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import com.github.dockerjava.transport.DockerHttpClient;
import com.google.common.collect.ImmutableList;

public class ContainerOrchestrationServiceImpl implements ConfigurableComponent, ContainerOrchestrationService {

    private static final String PARAMETER_CANNOT_BE_NULL = "The provided parameter cannot be null";
    private static final String UNABLE_TO_CONNECT_TO_DOCKER_CLI = "Unable to connect to docker cli";
    private static final Logger logger = LoggerFactory.getLogger(ContainerOrchestrationServiceImpl.class);
    private static final String APP_ID = "org.eclipse.kura.container.orchestration.provider.ConfigurableDocker";

    private ContainerOrchestrationServiceOptions currentConfig;

    private final Set<ContainerOrchestrationServiceListener> dockerServiceListeners = new CopyOnWriteArraySet<>();
    private final Set<FrameworkManagedContainer> frameworkManagedContainers = new CopyOnWriteArraySet<>();

    private DockerClient dockerClient;
    private CryptoService cryptoService;

    private AllowlistEnforcementMonitor allowlistEnforcementMonitor;

    private Map<String, String> containerInstancesDigests = new HashMap<>();

    public void setDockerClient(DockerClient dockerClient) {
        this.dockerClient = dockerClient;
    }

    public void setCryptoService(CryptoService cryptoService) {
        this.cryptoService = cryptoService;
    }

    public void activate(Map<String, Object> properties) {
        logger.info("Bundle {} is starting with config!", APP_ID);
        if (!isNull(properties)) {
            updated(properties);
        }
        logger.info("Bundle {} has started with config!", APP_ID);
    }

    public void deactivate() {
        logger.info("Bundle {} is stopping!", APP_ID);
        if (testConnection()) {
            disconnect();
        }
        logger.info("Bundle {} has stopped!", APP_ID);
    }

    public void updated(Map<String, Object> properties) {
        logger.info("Bundle {} is updating with config!", APP_ID);
        ContainerOrchestrationServiceOptions newProps = new ContainerOrchestrationServiceOptions(properties);
        if (!newProps.equals(this.currentConfig)) {

            this.currentConfig = newProps;

            logger.info("Connecting to docker ");

            if (!this.currentConfig.isEnabled()) {
                cleanUpDocker();
                return;
            }

            if (this.allowlistEnforcementMonitor != null) {
                closeEnforcementMonitor();
            }

            connect();

            if (!testConnection()) {
                logger.error("Could not connect to docker CLI.");
                return;
            }
            logger.info("Connection Successful");

            if (currentConfig.isEnforcementEnabled()) {
                try {
                    startEnforcementMonitor();
                } catch (Exception ex) {
                    logger.error("Error starting enforcement monitor. Due to {}", ex.getMessage());
                    closeEnforcementMonitor();
                    logger.warn("Enforcement won't be active.");
                }
                enforceAlreadyRunningContainer();
            }
        }

        logger.info("Bundle {} has updated with config!", APP_ID);
    }

    private void startEnforcementMonitor() {
        logger.info("Enforcement monitor starting...");
        this.allowlistEnforcementMonitor = this.dockerClient.eventsCmd().withEventFilter("start")
                .exec(new AllowlistEnforcementMonitor(currentConfig.getEnforcementAllowlist(), this));
        logger.info("Enforcement monitor starting...done.");
    }

    private void closeEnforcementMonitor() {

        if (this.allowlistEnforcementMonitor == null) {
            return;
        }

        try {
            logger.info("Enforcement monitor closing...");
            this.allowlistEnforcementMonitor.close();
            this.allowlistEnforcementMonitor.awaitCompletion(5, TimeUnit.SECONDS);
            this.allowlistEnforcementMonitor = null;
            logger.info("Enforcement monitor closing...done.");
        } catch (InterruptedException ex) {
            logger.error("Waited too long to close enforcement monitor, stopping it...", ex);
            Thread.currentThread().interrupt();
        } catch (IOException ex) {
            logger.error("Failed to close enforcement monitor, stopping it...", ex);
        }
    }

    private void enforceAlreadyRunningContainer() {
        if (this.allowlistEnforcementMonitor == null) {
            logger.warn("Enforcement wasn't started. Check on running containers will not be performed.");
            return;
        }
        logger.info("Enforcement check on already running containers...");
        this.allowlistEnforcementMonitor.enforceAllowlistFor(listContainerDescriptors());
        logger.info("Enforcement check on already running containers...done");
    }

    @Override
    public List<String> listContainersIds() {
        if (!testConnection()) {
            throw new IllegalStateException(UNABLE_TO_CONNECT_TO_DOCKER_CLI);
        }

        List<Container> containers = this.dockerClient.listContainersCmd().withShowAll(true).exec();

        List<String> result = new ArrayList<>();

        for (Container cont : containers) {
            result.add(cont.getId());
        }

        return result;

    }

    @Override
    public List<ContainerInstanceDescriptor> listContainerDescriptors() {
        if (!testConnection()) {
            throw new IllegalStateException(UNABLE_TO_CONNECT_TO_DOCKER_CLI);
        }

        List<Container> containers = this.dockerClient.listContainersCmd().withShowAll(true).exec();

        List<ContainerInstanceDescriptor> result = new ArrayList<>();
        containers.forEach(container -> result.add(ContainerInstanceDescriptor.builder()
                .setContainerName(getContainerName(container)).setContainerImage(getContainerTag(container))
                .setContainerImageTag(getContainerVersion(container)).setContainerID(container.getId())
                .setContainerPorts(parseContainerPortsList(container.getPorts()))
                .setContainerState(convertDockerStateToFrameworkState(container.getState()))
                .setFrameworkManaged(isFrameworkManaged(container)).build()));

        return result;

    }

    private Boolean isFrameworkManaged(Container container) {
        String containerName = getContainerName(container);
        return this.frameworkManagedContainers.stream().anyMatch(c -> c.name.equals(containerName));
    }

    private String getContainerName(Container container) {
        return container.getNames()[0].replace("/", "");
    }

    private String getContainerVersion(Container container) {
        String version = "";
        String[] image = container.getImage().split(":");
        if (image.length > 1 && !image[0].startsWith("sha256")) {
            version = image[1];
        }
        return version;
    }

    private String getContainerTag(Container container) {
        String[] image = container.getImage().split(":");
        if (image[0].startsWith("sha256")) {
            return "none";
        } else {
            return image[0];
        }
    }

    private List<org.eclipse.kura.container.orchestration.ContainerPort> parseContainerPortsList(
            ContainerPort[] ports) {

        List<org.eclipse.kura.container.orchestration.ContainerPort> kuraContainerPorts = new ArrayList<>();

        Arrays.asList(ports).stream().forEach(containerPort -> {
            String ipTest = containerPort.getIp();
            if (ipTest != null && (ipTest.equals("::") || ipTest.equals("0.0.0.0"))) {
                kuraContainerPorts
                        .add(new org.eclipse.kura.container.orchestration.ContainerPort(containerPort.getPrivatePort(),
                                containerPort.getPublicPort(), parsePortInternetProtocol(containerPort.getType())));
            }
        });

        return kuraContainerPorts;
    }

    private PortInternetProtocol parsePortInternetProtocol(String dockerPortProtocol) {

        switch (dockerPortProtocol) {

        case "tcp":
            return PortInternetProtocol.TCP;

        case "udp":
            return PortInternetProtocol.UDP;

        case "sctp":
            return PortInternetProtocol.SCTP;

        default:
            throw new IllegalStateException();

        }

    }

    private ContainerState convertDockerStateToFrameworkState(String dockerState) {

        switch (dockerState.trim()) {
        case "created":
            return ContainerState.INSTALLED;
        case "restarting":
            return ContainerState.INSTALLED;
        case "running":
            return ContainerState.ACTIVE;
        case "paused":
            return ContainerState.STOPPING;
        case "exited":
            return ContainerState.STOPPING;
        case "dead":
            return ContainerState.FAILED;
        default:
            return ContainerState.INSTALLED;
        }
    }

    @Override
    public Optional<String> getContainerIdByName(String name) {
        checkRequestEnv(name);

        List<Container> containers = this.dockerClient.listContainersCmd().withShowAll(true).exec();

        for (Container cont : containers) {
            String[] containerNames = cont.getNames();
            for (String containerName : containerNames) {
                if (containerName.equals("/" + name)) { // docker API seems to put a '/' in front of the names
                    return Optional.of(cont.getId());
                }
            }
        }

        return Optional.empty();
    }

    @Override
    public void startContainer(String id) throws KuraException {
        checkRequestEnv(id);
        try {
            this.dockerClient.startContainerCmd(id).exec();
        } catch (Exception e) {
            logger.error("Could not start container {}. It could be already running or not exist at all. Caused by {}",
                    id, e);
            throw new KuraException(KuraErrorCode.OS_COMMAND_ERROR);
        }
    }

    @Override
    public String startContainer(ContainerConfiguration container) throws KuraException, InterruptedException {
        checkRequestEnv(container);

        logger.info("Starting {} Microservice", container.getContainerName());

        final Optional<ContainerInstanceDescriptor> existingInstance = listContainerDescriptors().stream()
                .filter(c -> c.getContainerName().equals(container.getContainerName())).findAny();

        String containerId;

        if (existingInstance.isPresent()) {

            if (existingInstance.get().getContainerState() == ContainerState.ACTIVE) {
                logger.info("Found already existing running container");
                containerId = existingInstance.get().getContainerId();
            } else {
                logger.info("Found already exisiting not running container, recreating it..");
                containerId = existingInstance.get().getContainerId();
                deleteContainer(containerId);
                pullImage(container.getImageConfiguration());
                containerId = createContainer(container);
                startContainer(containerId);

            }
        } else {
            logger.info("Creating new container instance");
            pullImage(container.getImageConfiguration());
            containerId = createContainer(container);
            addContainerInstanceDigest(containerId, container.getEnforcementDigest());
            startContainer(containerId);
        }

        logger.info("Container Started Successfully");

        if (container.isFrameworkManaged()) {
            this.frameworkManagedContainers
                    .add(new FrameworkManagedContainer(container.getContainerName(), containerId));
        }

        return containerId;
    }

    @Override
    public void stopContainer(String id) throws KuraException {
        checkRequestEnv(id);
        try {

            if (listContainersIds().contains(id)) {
                this.dockerClient.stopContainerCmd(id).exec();
            }

        } catch (Exception e) {
            logger.error("Could not stop container {}. Caused by {}", id, e);
            throw new KuraException(KuraErrorCode.OS_COMMAND_ERROR);
        }
    }

    @Override
    public void deleteContainer(String id) throws KuraException {
        checkRequestEnv(id);
        try {

            if (listContainersIds().contains(id)) {
                this.dockerClient.removeContainerCmd(id).exec();
            }

            this.frameworkManagedContainers.removeIf(c -> id.equals(c.id));

            removeContainerInstanceDigest(id);

        } catch (Exception e) {
            logger.error("Could not remove container {}. Caused by {}", id, e);
            throw new KuraException(KuraErrorCode.OS_COMMAND_ERROR);
        }
    }

    private void checkRequestEnv(Object parameter) {
        if (isNull(parameter)) {
            throw new IllegalArgumentException(PARAMETER_CANNOT_BE_NULL);
        }
        if (!testConnection()) {
            throw new IllegalStateException(UNABLE_TO_CONNECT_TO_DOCKER_CLI);
        }
    }

    @Override
    public void registerListener(ContainerOrchestrationServiceListener dockerListener) {
        this.dockerServiceListeners.add(dockerListener);

    }

    @Override
    public void unregisterListener(ContainerOrchestrationServiceListener dockerListener) {
        this.dockerServiceListeners.remove(dockerListener);

    }

    private void imagePullHelper(String imageName, String imageTag, int timeOutSeconds,
            Optional<RegistryCredentials> repositoryCredentials) throws InterruptedException, KuraException {

        logger.info("Attempting to pull image: {}.", imageName);
        PullImageCmd pullRequest = this.dockerClient.pullImageCmd(imageName).withTag(imageTag);

        if (repositoryCredentials.isPresent()) {
            doAuthenticate(repositoryCredentials.get(), pullRequest);
        }

        pullRequest.exec(new PullImageResultCallback() {

            @Override
            public void onNext(PullResponseItem item) {
                super.onNext(item);
                createLoggerMessageForContainerPull(item, imageName, imageTag);

            }

        }).awaitCompletion(timeOutSeconds, TimeUnit.SECONDS);

    }

    private void doAuthenticate(RegistryCredentials repositoryCredentials, PullImageCmd pullRequest)
            throws KuraException {
        if (!(repositoryCredentials instanceof PasswordRegistryCredentials)) {
            throw new KuraException(KuraErrorCode.BAD_REQUEST);
        }

        PasswordRegistryCredentials repositoryPasswordCredentials = (PasswordRegistryCredentials) repositoryCredentials;

        AuthConfig authConfig = new AuthConfig().withUsername(repositoryPasswordCredentials.getUsername()).withPassword(
                new String(this.cryptoService.decryptAes(repositoryPasswordCredentials.getPassword().getPassword())));
        Optional<String> url = repositoryPasswordCredentials.getUrl();
        if (url.isPresent()) {
            logger.info("Attempting to sign into repo: {}", url.get());
            authConfig = authConfig.withRegistryAddress(url.get());
        }
        pullRequest.withAuthConfig(authConfig);
    }

    private void createLoggerMessageForContainerPull(PullResponseItem item, String imageName, String imageTag) {

        if (logger.isDebugEnabled()) {
            logger.debug("Pulling {}:{} Layer {}, State: {}", imageName, imageTag, item.getId(), item.getStatus());
        }

        if (item.isErrorIndicated()) {
            logger.error("Unable To Pull image {}:{} because : {}", item.getErrorDetail(), imageName, imageTag);
        }

        if (item.isPullSuccessIndicated()) {
            logger.info("Image pull of {}:{}, Layer: {}, was successful", imageName, imageTag, item.getId());
        }
    }

    private String createContainer(ContainerConfiguration containerDescription) throws KuraException {
        if (!testConnection()) {
            throw new IllegalStateException("failed to reach docker engine");
        }

        if (containerDescription == null) {
            throw new IllegalStateException("failed to create container, null containerImage passed");
        }

        String containerImageFullString = String.format("%s:%s",
                containerDescription.getImageConfiguration().getImageName(),
                containerDescription.getImageConfiguration().getImageTag());

        CreateContainerCmd commandBuilder = null;
        try {
            commandBuilder = this.dockerClient.createContainerCmd(containerImageFullString);

            if (containerDescription.getContainerName() != null) {
                commandBuilder = commandBuilder.withName(containerDescription.getContainerName());
            }

            HostConfig configuration = new HostConfig();

            commandBuilder = containerEnviromentVariablesHandler(containerDescription, commandBuilder);

            commandBuilder = containerEntrypointHandler(containerDescription, commandBuilder);

            // Host Configuration Related
            configuration = containerVolumeMangamentHandler(containerDescription, configuration);

            configuration = containerDevicesHandler(containerDescription, configuration);

            if (containerDescription.getRestartOnFailure()) {
                configuration = configuration.withRestartPolicy(RestartPolicy.unlessStoppedRestart());
            }

            configuration = containerPortManagementHandler(containerDescription, configuration, commandBuilder);

            configuration = containerLogConfigurationHandler(containerDescription, configuration);

            configuration = containerNetworkConfigurationHandler(containerDescription, configuration);

            configuration = containerMemoryConfigurationHandler(containerDescription, configuration);

            configuration = containerCpusConfigurationHandler(containerDescription, configuration);

            configuration = containerGpusConfigurationHandler(containerDescription, configuration);

            configuration = containerRuntimeConfigurationHandler(containerDescription, configuration);

            if (containerDescription.isContainerPrivileged()) {
                configuration = configuration.withPrivileged(containerDescription.isContainerPrivileged());
            }

            return commandBuilder.withHostConfig(configuration).exec().getId();

        } catch (Exception e) {
            logger.error("Failed to create container", e);
            throw new KuraException(KuraErrorCode.PROCESS_EXECUTION_ERROR);
        } finally {
            if (!isNull(commandBuilder)) {
                commandBuilder.close();
            }
        }
    }

    private HostConfig containerLogConfigurationHandler(ContainerConfiguration containerDescription,
            HostConfig configuration) {
        LoggingType lt;
        switch (containerDescription.getContainerLoggingType().toUpperCase().trim()) {
        case "NONE":
            lt = LoggingType.NONE;
            break;
        case "LOCAL":
            lt = LoggingType.LOCAL;
            break;
        case "ETWLOGS":
            lt = LoggingType.ETWLOGS;
            break;
        case "JSON_FILE":
            lt = LoggingType.JSON_FILE;
            break;
        case "SYSLOG":
            lt = LoggingType.SYSLOG;
            break;
        case "JOURNALD":
            lt = LoggingType.JOURNALD;
            break;
        case "GELF":
            lt = LoggingType.GELF;
            break;
        case "FLUENTD":
            lt = LoggingType.FLUENTD;
            break;
        case "AWSLOGS":
            lt = LoggingType.AWSLOGS;
            break;
        case "DB":
            lt = LoggingType.DB;
            break;
        case "SPLUNK":
            lt = LoggingType.SPLUNK;
            break;
        case "GCPLOGS":
            lt = LoggingType.GCPLOGS;
            break;
        case "LOKI":
            lt = LoggingType.LOKI;
            break;
        default:
            lt = LoggingType.DEFAULT;
            break;
        }

        LogConfig lc = new LogConfig(lt, containerDescription.getLoggerParameters());

        configuration.withLogConfig(lc);

        return configuration;
    }

    private HostConfig containerNetworkConfigurationHandler(ContainerConfiguration containerDescription,
            HostConfig configuration) {

        Optional<String> networkMode = containerDescription.getContainerNetworkConfiguration().getNetworkMode();
        if (networkMode.isPresent() && !networkMode.get().trim().isEmpty()) {
            configuration.withNetworkMode(networkMode.get().trim());
        }

        return configuration;
    }

    private HostConfig containerMemoryConfigurationHandler(ContainerConfiguration containerDescription,
            HostConfig configuration) {

        Optional<Long> memory = containerDescription.getMemory();
        if (memory.isPresent()) {
            try {
                configuration.withMemory(memory.get());
            } catch (NumberFormatException e) {
                logger.warn("Memory value {} not valid. Caused by {}", memory.get(), e);
            }
        }

        return configuration;
    }

    private HostConfig containerCpusConfigurationHandler(ContainerConfiguration containerDescription,
            HostConfig configuration) {

        Optional<Float> cpus = containerDescription.getCpus();
        cpus.ifPresent(cpu -> {
            configuration.withCpuPeriod(100000L);
            configuration.withCpuQuota((long) (100000L * cpus.get()));
        });

        return configuration;
    }

    private HostConfig containerGpusConfigurationHandler(ContainerConfiguration containerDescription,
            HostConfig configuration) {

        Optional<String> gpus = containerDescription.getGpus();
        gpus.ifPresent(gpu -> configuration.withDeviceRequests(ImmutableList
                .of(new DeviceRequest().withDriver("nvidia").withCount(gpu.equals("all") ? -1 : Integer.parseInt(gpu))
                        .withCapabilities(ImmutableList.of(ImmutableList.of("gpu"))))));

        return configuration;
    }

    private HostConfig containerRuntimeConfigurationHandler(ContainerConfiguration containerDescription,
            HostConfig configuration) {

        Optional<String> runtime = containerDescription.getRuntime();
        runtime.ifPresent(configuration::withRuntime);

        return configuration;
    }

    private HostConfig containerPortManagementHandler(ContainerConfiguration containerConfiguration,
            HostConfig hostConfig, CreateContainerCmd commandBuilder) {

        List<ExposedPort> exposedPorts = new LinkedList<>();
        Ports portbindings = new Ports();

        if (containerConfiguration.getContainerPorts() != null
                && !containerConfiguration.getContainerPorts().isEmpty()) {

            List<org.eclipse.kura.container.orchestration.ContainerPort> containerPorts = containerConfiguration
                    .getContainerPorts();

            for (org.eclipse.kura.container.orchestration.ContainerPort port : containerPorts) {

                InternetProtocol ipProtocol = InternetProtocol.TCP;

                if (port.getInternetProtocol() != null) {
                    try {
                        ipProtocol = InternetProtocol.parse(port.getInternetProtocol().toString());
                    } catch (IllegalArgumentException e) {
                        logger.warn("Invalid internet protocol: {}. Using TCP.", port.getInternetProtocol());
                    }
                }

                ExposedPort exposedPort = new ExposedPort(port.getInternalPort(), ipProtocol);
                exposedPorts.add(exposedPort);
                portbindings.bind(exposedPort, Binding.bindPort(port.getExternalPort()));
            }

            if (exposedPorts.size() != portbindings.getBindings().size()) {
                logger.error("portsExternal and portsInternal must have the same size: {}",
                        containerConfiguration.getContainerName());
            }
        }

        hostConfig.withPortBindings(portbindings);
        commandBuilder.withExposedPorts(exposedPorts);

        return hostConfig;
    }

    private CreateContainerCmd containerEnviromentVariablesHandler(ContainerConfiguration containerDescription,
            CreateContainerCmd commandBuilder) {

        if (containerDescription.getContainerEnvVars().isEmpty()) {
            return commandBuilder;
        }

        if (containerDescription.getContainerEnvVars() != null
                && !containerDescription.getContainerEnvVars().isEmpty()) {
            List<String> formattedEnvVars = new LinkedList<>();
            for (String env : containerDescription.getContainerEnvVars()) {
                if (!env.trim().isEmpty()) {
                    formattedEnvVars.add(env.trim());
                }
            }
            commandBuilder = commandBuilder.withEnv(formattedEnvVars);
        }

        return commandBuilder;

    }

    private CreateContainerCmd containerEntrypointHandler(ContainerConfiguration containerDescription,
            CreateContainerCmd commandBuilder) {

        if (containerDescription.getEntryPoint().isEmpty() || containerDescription.getEntryPoint() == null) {
            return commandBuilder;
        }

        return commandBuilder.withEntrypoint(containerDescription.getEntryPoint());

    }

    private HostConfig containerVolumeMangamentHandler(ContainerConfiguration containerDescription,
            HostConfig hostConfiguration) {

        if (containerDescription.getContainerVolumes().isEmpty()) {
            return hostConfiguration;
        }

        List<Bind> bindsToAdd = new LinkedList<>();

        if (containerDescription.getContainerVolumes() != null
                && !containerDescription.getContainerVolumes().isEmpty()) {

            for (Map.Entry<String, String> element : containerDescription.getContainerVolumes().entrySet()) {
                // source: path on host (key)
                // destination: path in container (value)
                if (!element.getKey().isEmpty() && !element.getValue().isEmpty()) {
                    Volume tempVolume = new Volume(element.getValue());
                    Bind tempBind = new Bind(element.getKey(), tempVolume);
                    bindsToAdd.add(tempBind);
                }
            }
            hostConfiguration = hostConfiguration.withBinds(bindsToAdd);

        }

        return hostConfiguration;

    }

    private HostConfig containerDevicesHandler(ContainerConfiguration containerDescription,
            HostConfig hostConfiguration) {

        if (containerDescription.getContainerDevices().isEmpty()) {
            return hostConfiguration;
        }

        if (containerDescription.getContainerDevices() != null
                && !containerDescription.getContainerDevices().isEmpty()) {

            List<Device> deviceList = new LinkedList<>();
            for (String deviceString : containerDescription.getContainerDevices()) {
                deviceList.add(Device.parse(deviceString));
            }
            if (!deviceList.isEmpty()) {
                hostConfiguration = hostConfiguration.withDevices(deviceList);

            }
        }

        return hostConfiguration;

    }

    private boolean doesImageExist(String imageName, String imageTag) {

        if (!testConnection()) {
            throw new IllegalStateException(UNABLE_TO_CONNECT_TO_DOCKER_CLI);
        }

        List<Image> images = this.dockerClient.listImagesCmd().exec();

        String requiredImage = imageName + ":" + imageTag;

        for (Image image : images) {
            if (isNull(image.getRepoTags())) {
                continue;
            }
            for (String tag : image.getRepoTags()) {
                if (tag.equals(requiredImage)) {
                    return true;
                }
            }
        }

        return false;
    }

    private void cleanUpDocker() {

        if (this.allowlistEnforcementMonitor != null) {
            closeEnforcementMonitor();
        }

        if (testConnection()) {
            this.dockerServiceListeners.forEach(ContainerOrchestrationServiceListener::onDisabled);
            disconnect();
        }
    }

    private boolean connect() {
        if (this.currentConfig.getHostUrl() == null) {
            return false;
        }
        DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder()
                .withDockerHost(this.currentConfig.getHostUrl()).build();

        DockerHttpClient httpClient = new ApacheDockerHttpClient.Builder().dockerHost(config.getDockerHost()).build();

        this.dockerClient = DockerClientImpl.getInstance(config, httpClient);

        final boolean connected = testConnection();

        if (connected) {
            this.dockerServiceListeners.forEach(ContainerOrchestrationServiceListener::onConnect);
        }
        return connected;
    }

    private void disconnect() {
        if (testConnection()) {
            try {
                this.dockerServiceListeners.forEach(ContainerOrchestrationServiceListener::onDisconnect);
                this.dockerClient.close();
            } catch (IOException e) {
                logger.error("Error disconnecting", e);
            }
        }
    }

    private boolean testConnection() {
        boolean canConnect = false;
        try {
            this.dockerClient.pingCmd().exec();
            canConnect = true;
        } catch (Exception ex) {
            canConnect = false;
        }
        return canConnect;
    }

    private static class FrameworkManagedContainer {

        private final String name;
        private final String id;

        public FrameworkManagedContainer(String name, String id) {
            this.name = name;
            this.id = id;
        }

        @Override
        public int hashCode() {
            return Objects.hash(this.id, this.name);
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            FrameworkManagedContainer other = (FrameworkManagedContainer) obj;
            return Objects.equals(this.id, other.id) && Objects.equals(this.name, other.name);
        }

    }

    @Override
    public void pullImage(ImageConfiguration imageConfig) throws KuraException, InterruptedException {
        if (isNull(imageConfig.getImageName()) || isNull(imageConfig.getImageTag())
                || imageConfig.getimageDownloadTimeoutSeconds() < 0 || isNull(imageConfig.getRegistryCredentials())) {
            throw new IllegalArgumentException("Parameters cannot be null or negative");
        }

        boolean imageAvailableLocally = doesImageExist(imageConfig.getImageName(), imageConfig.getImageTag());

        if (!imageAvailableLocally) {
            try {
                imagePullHelper(imageConfig.getImageName(), imageConfig.getImageTag(),
                        imageConfig.getimageDownloadTimeoutSeconds(), imageConfig.getRegistryCredentials());
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e) {
                logger.error("Cannot pull container. Caused by ", e);
                throw new KuraException(KuraErrorCode.IO_ERROR, "Unable to pull container");
            }
        }
    }

    @Override
    public void pullImage(String imageName, String imageTag, int timeOutSeconds,
            Optional<RegistryCredentials> registryCredentials) throws KuraException, InterruptedException {
        pullImage(new ImageConfiguration.ImageConfigurationBuilder().setImageName(imageName).setImageTag(imageTag)
                .setImageDownloadTimeoutSeconds(timeOutSeconds).setRegistryCredentials(registryCredentials).build());
    }

    @Override
    public List<ImageInstanceDescriptor> listImageInstanceDescriptors() {
        if (!testConnection()) {
            throw new IllegalStateException(UNABLE_TO_CONNECT_TO_DOCKER_CLI);
        }
        List<Image> images = this.dockerClient.listImagesCmd().withShowAll(true).exec();
        List<ImageInstanceDescriptor> result = new ArrayList<>();
        images.forEach(image -> {
            InspectImageResponse iir = this.dockerClient.inspectImageCmd(image.getId()).exec();

            ImageInstanceDescriptorBuilder imageBuilder = ImageInstanceDescriptor.builder()
                    .setImageName(getImageName(image)).setImageTag(getImageTag(image)).setImageId(image.getId())
                    .setImageAuthor(iir.getAuthor()).setImageArch(iir.getArch())
                    .setimageSize(iir.getSize().longValue());

            if (image.getLabels() != null) {
                imageBuilder.setImageLabels(image.getLabels());
            }

            result.add(imageBuilder.build());
        });
        return result;
    }

    private String getImageName(Image image) {
        if (image.getRepoTags() == null || image.getRepoTags().length < 1) {
            return "";
        }

        return image.getRepoTags()[0].split(":")[0];
    }

    private String getImageTag(Image image) {
        if (image.getRepoTags() == null || image.getRepoTags().length < 1
                || image.getRepoTags()[0].split(":").length < 2) {
            return "";
        }

        return image.getRepoTags()[0].split(":")[1];
    }

    @Override
    public void deleteImage(String imageId) throws KuraException {
        checkRequestEnv(imageId);
        try {
            this.dockerClient.removeImageCmd(imageId).exec();
        } catch (Exception e) {
            logger.error("Could not remove image {}. Caused by {}", imageId, e);
            throw new KuraException(KuraErrorCode.OS_COMMAND_ERROR, "Delete Container Image",
                    "500 (server error). Image is most likely in use by a container.");
        }
    }

    public Set<String> getImageDigestsByContainerId(String containerId) {

        Set<String> imageDigests = new HashSet<>();

        String containerName = listContainerDescriptors().stream()
                .filter(container -> container.getContainerId().equals(containerId)).findFirst()
                .map(container -> container.getContainerName()).orElse(null);

        if (containerName == null) {
            return imageDigests;
        }

        dockerClient.listImagesCmd().withImageNameFilter(containerName).exec().stream().forEach(image -> {
            List<String> digests = Arrays.asList(image.getRepoDigests());
            digests.stream().forEach(digest -> imageDigests.add(digest.split("@")[1]));
        });

        return imageDigests;
    }

    private void addContainerInstanceDigest(String containerId, Optional<String> containerInstanceDigest) {

        if (containerInstanceDigest.isPresent()) {
            logger.info(
                    "Container {} presented enforcement digest. Adding it to the digests allowlist: it will be used if the enforcement is enabled.",
                    containerId);
            this.containerInstancesDigests.put(containerId, containerInstanceDigest.get());
        } else {
            logger.info("Container {} doesn't contain the enforcement digest. "
                    + "If enforcement is enabled, be sure that the digest is included in the Orchestration Service allowlist",
                    containerId);
        }

    }

    private void removeContainerInstanceDigest(String containerId) {
        if (this.containerInstancesDigests.containsKey(containerId)) {
            this.containerInstancesDigests.remove(containerId);
            logger.info("Removed digest of container with ID {} from Container Instances Allowlist", containerId);
            enforceAlreadyRunningContainer();
        }
    }

    public Set<String> getContainerInstancesAllowlist() {
        return new HashSet<>(this.containerInstancesDigests.values());
    }

}
